package com.bjy.qa.agent.transport.mqtt;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bjy.qa.agent.enumtype.CatalogType;
import com.bjy.qa.agent.tester.*;
import com.bjy.qa.agent.tools.ProcessCommandTool;
import com.bjy.qa.agent.tools.mqtt.MqttHelper;
import com.bjy.qa.agent.tools.mqtt.MqttQOS;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.testng.TestNG;
import org.testng.xml.XmlClass;
import org.testng.xml.XmlSuite;
import org.testng.xml.XmlTest;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Component
public class MqttClient implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(MqttClient.class);

    public static ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

    private static String downTopic; // 下行 的 topic 地址，例如：QaPlatform/DOWN/29002272-4659-4808-a804-08ce3388b136 这个是下发命令的 topic，agent.key=29002272-4659-4808-a804-08ce3388b136 的 agent
    private static String upTopic; // 上行 的 topic 地址，例如：QaPlatform/UP
    private static MqttHelper downMqttHelper = null; // 下行，接收命令的通道

    /**
     * 创建 mqtt 通道并监听处理远程控制命令
     * @param url mqtt 服务器url。例如：tcp://127.0.0.1:1883
     * @param qos mqtt 的 qos。QOS服务质量（默认LEVEL1）
     * @param baseTopic mqtt 的 topic 前缀。如果 baseTopic 传入 QaPlatform，subTopic 传入 29002272-4659-4808-a804-08ce3388b136，那完整的 topic 就是 QaPlatform/DOWN/29002272-4659-4808-a804-08ce3388b136
     * @param subTopic mqtt 的 topic 后缀。如果 baseTopic 传入 QaPlatform，subTopic 传入 29002272-4659-4808-a804-08ce3388b136，那完整的 topic 就是 QaPlatform/DOWN/29002272-4659-4808-a804-08ce3388b136
     * @param allowAnonymous 是否允许匿名连接
     * @param userName mqtt 的用户名
     * @param passWord mqtt 的密码
     * @throws MqttException
     */
    public void connectMqtt(String url, MqttQOS qos, String baseTopic, String subTopic, Boolean allowAnonymous, String userName, String passWord) throws MqttException {
        logger.info("连接 MQTT 通道: {url={}, qos: {}, baseTopic={}, subTopic={}, allowAnonymous={}, userName={}, passWord=******}", url, qos, baseTopic, subTopic, allowAnonymous, userName);
        downTopic = baseTopic + "/DOWN/" + subTopic;
        upTopic = baseTopic + "/UP";

        // 接收远程控制
        downMqttHelper = new MqttHelper();
        downMqttHelper.setQos(qos);
        if (allowAnonymous) {
            downMqttHelper.getInstance(url, "SUB:" + subTopic + "_" + System.currentTimeMillis());
        } else {
            downMqttHelper.getInstance(url, "SUB:" + subTopic + "_" + System.currentTimeMillis(), userName, passWord);
        }

        downMqttHelper.subscribe(new MqttHelper.MqttListener() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
            }

            @Override
            public void connectionLost(Throwable cause) {
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                logger.info("--> topic: {}", topic);
                logger.info("--> message: {}", message);

                if (topic.equals(downTopic)) {
                    cachedThreadPool.execute(() -> {
                        JSONObject jsonObject = JSON.parseObject(message.toString());

                        switch (jsonObject.getString("msg")) {
                            case "reboot":
                                logger.info("收到重启 agent 命令......");
                                String pid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; // 获取自己的进程ID

                                String system = System.getProperty("os.name").toLowerCase();
                                if (system.contains("win")) {
                                    // 生成 reboot.bat 脚本
                                    File temp = new File("../reboot.bat");
                                    try {
                                        String script = "echo off \r\n" +
                                                "taskkill /f /pid " + pid + " \r\n" +
                                                "cd .. \r\n" +
                                                "start.bat \r\n";
                                        temp.createNewFile();
                                        FileWriter fileWriter = new FileWriter(temp);
                                        fileWriter.write(script);
                                        fileWriter.close();
                                    } catch (IOException e) {
                                        throw new RuntimeException(e);
                                    }

                                    ProcessCommandTool.getProcessLocalCommandStr("start /min ..\\reboot.bat"); // 执行 reboot.bat
                                } else {
                                    // 生成 reboot.sh 脚本
                                    File temp = new File("../reboot.sh");
                                    try {
                                        String script = "#! /bin/bash \n" +
                                                "kill -9 " + pid + "\n" +
                                                "cd ../ \n" +
                                                "sleep 1s \n" +
                                                "sh ./start.sh\n";
                                        temp.createNewFile();
                                        FileWriter fileWriter = new FileWriter(temp);
                                        fileWriter.write(script);
                                        fileWriter.close();
                                    } catch (IOException e) {
                                        throw new RuntimeException(e);
                                    }

                                    ProcessCommandTool.getProcessLocalCommandStr("nohup sh ../reboot.sh > /dev/null 2>&1 &"); // 执行 reboot.sh
                                }
                                logger.info("重启 agent 完成");
                                break;
                            case "suite":
                            case "debug":
                            case "perfSuite":
                                List<JSONObject> cases = jsonObject.getJSONArray("cases").toJavaList(JSONObject.class);

                                // 给 step 添加公共步骤（为了节省传输大小，只下发了一份公共步骤，所以需要再把公共步骤添加回去）
                                if (jsonObject.getJSONArray("pubCases") != null) {
                                    List<JSONObject> pubCases = jsonObject.getJSONArray("pubCases").toJavaList(JSONObject.class);
                                    for (JSONObject caseInfo : cases) {
                                        List<JSONObject> steps = caseInfo.getJSONArray("steps").toJavaList(JSONObject.class);
                                        for (JSONObject stepInfo : steps) {
                                            JSONObject step = stepInfo.getJSONObject("step");
                                            if (step.getIntValue("stepType") == CatalogType.PUBLIC_STEP.getValue()) {
                                                for (JSONObject pubCase : pubCases) {
                                                    if (pubCase.getLongValue("cid") == step.getLongValue("apiId")) {
                                                        step.put("pubSteps", pubCase.getJSONArray("steps"));
                                                        step.put("extra1", pubCase.getString("name"));
                                                    }
                                                }
                                            }
                                        }
                                    }
                                }

                                TestNG tng = new TestNG();
                                List<XmlSuite> suiteList = new ArrayList<>();
                                XmlSuite xmlSuite = new XmlSuite();

                                // 每个 case 生成一个 xmlTest（也就是testng.xml中的标签＜test＞）
                                for (JSONObject dataInfo : cases) {
                                    // 生成 xmlTest，并将 casesInfo 作为参数传入
                                    XmlTest xmlTest = new XmlTest(xmlSuite);
                                    Map<String, String> parameters = new HashMap<>();
                                    parameters.put("casesInfo", dataInfo.toJSONString());
                                    if (xmlSuite.getParameter("casesInfo") == null) {
                                        xmlSuite.setParameters(parameters);
                                    }
                                    xmlTest.setParameters(parameters);

                                    // 生成 xmlClass，并将 Tester 测试类放入（后续有可能需要根据不同的测试类型，生成不同的测试类）
                                    List<XmlClass> classes = new ArrayList<>();
                                    if ("suite".equalsIgnoreCase(jsonObject.getString("msg"))) {
                                        classes.add(new XmlClass(Tester.class));
                                    } else if ("debug".equalsIgnoreCase(jsonObject.getString("msg"))) {
                                        classes.add(new XmlClass(DebugTester.class));
                                    } else if ("perfSuite".equalsIgnoreCase(jsonObject.getString("msg"))) {
                                        classes.add(new XmlClass(PerfTester.class));
                                    }
                                    xmlTest.setXmlClasses(classes);
                                }

                                // 执行测试套件
                                suiteList.add(xmlSuite);
                                tng.setXmlSuites(suiteList);
                                tng.addListener(new SuiteListener());
                                tng.run();
                                break;
                            case "forceStopSuite": // 停止测试套件
                                List<JSONObject> caseList = jsonObject.getJSONArray("cases").toJavaList(JSONObject.class);
                                int type = jsonObject.getIntValue("type");
                                for (JSONObject aCase : caseList) {
                                    int resultId = aCase.getIntValue("rid");
                                    int caseId = aCase.getIntValue("cid");
                                    TaskManager.forceStopSuite(type, resultId, caseId);
                                }
                                break;
                        }
                    });
                } else {
                    logger.info("未识别的 topic。topic：{}, message: {}", topic, message);
                }
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
            }
        }, downTopic);
    }

    /**
     * 通过 mqtt 的 上行 topic 推送消息到服务器
     * @param upMsg 需要发送的消息
     * @throws MqttException
     * @throws InterruptedException
     */
    public static void pub(JSONObject upMsg) throws MqttException, InterruptedException {
        downMqttHelper.publish(upTopic, JSON.toJSONString(upMsg));
    }

    /**
     * close 方法，释放资源
     * @throws MqttException
     */
    public void close() throws MqttException {
        if (downMqttHelper != null) {
            logger.info("断开 MQTT 通道");
            downMqttHelper.close(); // 关闭 接收远程控制的 mqtt 连接
        }
    }
}
